Flume 集群部署(Docker)
Flume 简介
Flume 是一款分布式、可靠、高可用的海量日志采集系统,由 source、channel、sink 三要素组成。在 Flume 中,一个最基本的数据采集流程如下图所示:
准备工作
本文目的在于通过 flume、zookeeper、kafka、docker 搭建基本的日志采集系统,实现日志的采集并将数据发送至 Kafka。搭建前,需确保已安装以下组件:
- JDK 8
- Flume
- Zookeeper
- Kafka
- Docker
同时需确保已掌握以下技能:
- 能在本地部署并成功启动 Flume,且了解 Flume 的核心概念
source
、channel
、sink
- 能基本使用 Zookeeper,了解节点的创建、更新、查看与删除
- 能基本使用 Kafka,了解 topic 的概念,实现消息的创建与消费
- 了解 Docker,能基于
Dockerfile
创建镜像并启动容器
在本系统中,各组件职责分配如下:
组件名 | 组件职责 |
---|---|
Flume | 负责日志采集 |
Zookeeper | 负责配置管理 |
Kafka | 负责数据接收 |
Flume 配置
日志采集系统搭建的第一步,是完成 Flume 的配置。
在进行 Flume 配置前,我们首先假设面临如下需求:
- 数据来源为 Thrift
- 要求不同的日志源发送往相应的 kafka 队列
在此基础上,我们设计 Flume 配置文件 flume-docker.conf
如下:
# 配置一个 agent,agent 的名称可以自定义(如:docker)
# 指定 agent 的 sources(如:s1)、sinks(如:k1)、channels(如:c1)
docker.sources = s1
docker.sinks = k1 k2
docker.channels = c1 c2
# 配置 source
docker.sources.s1.channels = c1 c2
docker.sources.s1.type = thrift
docker.sources.s1.bind = 0.0.0.0
docker.sources.s1.port = 9999
docker.sources.s1.selector.type = multiplexing
docker.sources.s1.selector.header = type
docker.sources.s1.selector.mapping.panda_1 = c1
docker.sources.s1.selector.mapping.panda_2 = c2
# 配置 channels
docker.channels.c1.type = file
docker.channels.c1.dataDirs = /tmp/data/flume/c1/data
docker.channels.c1.checkpointDir = /tmp/data/flume/c1/checkpoint
docker.channels.c2.type = file
docker.channels.c2.dataDirs = /tmp/data/flume/c2/data
docker.channels.c2.checkpointDir = /tmp/data/flume/c2/checkpoint
# 配置 sinks
docker.sinks.k1.channel = c1
docker.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
docker.sinks.k1.kafka.bootstrap.servers = 192.168.117.128:9092
docker.sinks.k1.kafka.topic = panda_1
docker.sinks.k1.kafka.producer.acks = -1
docker.sinks.k2.channel = c2
docker.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
docker.sinks.k2.kafka.bootstrap.servers = 192.168.117.128:9092
docker.sinks.k2.kafka.topic = panda_2
docker.sinks.k2.kafka.producer.acks = -1
该配置文件主要是通过以下 3 点满足上述假设的需求:
- 使用 Flume Selector,通过
multiplexing
实现将多数据源发送至多管道 - 通过 Selector 中的
header
和mapping
决定不同的数据源分别发送至哪个channel
- 配置多个
sink
,分别对应各个channel
,并将其发送至指定的 kafka 队列
Zookeeper 管理
日志采集系统搭建的第二步,是将上述配置文件 flume-docker.conf
交由 Zookeeper 管理。
为此,我们首先要启动 Zookeeper 服务。在本例中,我们使用单节点模式,启动命令如下:
cd ${zk_home}/bin
./zkServer.sh start
然后,我们需要将上述配置文件 flume-docker.conf
的内容添加至 Zookeeper 的节点。
在本例中,此部分通过 Java API 实现,具体步骤如下:
STEP 01:引入 Zookeeper 客户端框架 Curator
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
STEP 02:编写代码实现 Zookeeper 节点创建
首先,启动客户端连接:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(ip + ":" + port)
.sessionTimeoutMs(10000)
.connectionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.build();
client.start();
然后,编写节点创建方法:
client.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, data.getBytes(Charsets.UTF_8.name()));
最后,将配置文件 flume-docker.conf
发送至 Zookeeper 的 /flume/docker
节点:
ClientDemo clientDemo = new ClientDemo("192.168.x.x", 2181);
String filePath = ClientDemo.class.getClassLoader().getResource("").getPath() + "flume-docker.conf";
String content = FileUtils.readFileToString(new File(filePath), Charsets.UTF_8);
clientDemo.createPersistentNode("/flume/docker", content);
Kafka 启动
日志采集系统搭建的第三步,是启动 Kafka,用于接收 Flume 传输过来的日志。
在本例中,仅启动单节点 Kafka,启动命令如下:
${kafka_home}/bin/kafka-server-start.sh -daemon ${kafka_home}/config/server.properties
Docker 镜像
完成了前面的三个步骤后,我们来到了系统搭建的最后一部分,使用 Docker 构建镜像。
具体构建步骤如下:
STEP 01:创建本地构建的上下文目录
cd /usr/local/docker
mkdir flume-app
STEP 02:传输所需文件至构建目录下
在本例中,所需文件列表如下:
- apache-flume-1.9.0-bin.tar.gz
- jdk-8u251-linux-x64.tar.gz
- zookeeper-3.4.8.jar
STEP 03:创建并编写 Dockerfile
本例的 Dockerfile
如下所示:
FROM centos
MAINTAINER panda "pentaxie@qq.com"
# install java
ADD jdk-8u251-linux-x64.tar.gz /usr/java
RUN cp /etc/profile /etc/profile.bak
ENV JAVA_HOME /usr/java/jdk1.8.0_251
ENV PATH $JAVA_HOME/bin:$PATH
ENV CLASSPATH .:$JAVA_HOME/lib
# install Flume
ADD apache-flume-1.9.0-bin.tar.gz /usr/local
WORKDIR /usr/local
RUN mv apache-flume-1.9.0-bin flume
# add lib
ADD zookeeper-3.4.8.jar /usr/local/flume/lib
EXPOSE 19999
ENTRYPOINT (/usr/local/flume/bin/flume-ng agent -n docker -c /usr/local/flume/conf -z zkhost:2181 -p /flume -Dflume.root.logger=DEBUG,console)
在编写 Dockerfile
时,需注意以下几点:
- 必须引入 Zookeeper 依赖包,否则启动 Flume 时会抛出异常
- Flume 启动命令中
agent
的名称需与配置文件中的agent
保持一致,否则 Flume 无法找到agent
的配置 - Flume 启动命令中的 Zookeeper 目录(
-p
所指目录)要与上述配置文件flume-docker.conf
所处目录一致,即/flume
STEP 04:构建镜像
docker build -t flume-docker .
STEP 05:启动容器
docker run --add-host zkhost:192.168.x.x --add-host master:192.168.x.x -p 19999:9999 flume-docker
启动容器时,需要注意以下几点:
- 需使用
--add-host
设置 Zookeeper 和 Kafka 所处主机的ip
地址,否则启动时会抛出连接异常 - 需使用
-p
映射容器中 thrift 的监听端口,否则外部客户端无法调用
Thrift 客户端调用
至此,日志采集系统搭建步骤已经全部完成。
现在,我们可以使用 thrift 客户端发送数据,看看系统的实现效果如何。
在本例中,thrift 客户端由 Java 实现。
首先,引入依赖包:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
然后,编写客户端代码,实现将数据发送至 thrift 监听端口:
public void writeData(String data) {
Map<String, String> headers = Maps.newHashMap();
headers.put("type", "panda_1");
Event event = EventBuilder.withBody(data, Charsets.UTF_8);
event.setHeaders(headers);
try {
client.append(event);
} catch (EventDeliveryException e) {
e.printStackTrace();
} finally {
client.close();
}
}
public void init(String hostname, Integer port) {
this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public static void main(String[] args) {
String data = "hello, dianchu";
ClientDemo clientDemo = new ClientDemo();
clientDemo.init("192.168.x.x", 19999);
clientDemo.writeData(data);
}
此处,需要注意以下几点:
- 代码中的
19999
端口指向 Docker 容器中的9999
端口 - 在往
Event
写入数据时,要在请求头中设置type
值,该值要处于flume-docer.conf
文件中mapping
的取值范围内
最后,执行 main
方法,并到 Kafka 中查看 panda_1
队列的情况:
cd ${kafka_home}/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic panda_1 --from-beginning
hello, dianchu
控制台成功输出预期结果,说明日志采集系统正常运行。